package com.zeyu.framework.core.configuration;

import com.zeyu.framework.tools.mq.receiver.LowestMessageListener;
import com.zeyu.framework.tools.mq.receiver.RPCMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;

/**
 * RabbitMQ是实现AMQP（高级消息队列协议）的消息中间件的一种，最初起源于金融系统，
 * 用于在分布式系统中存储转发消息，在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦，
 * 消息的发送者无需知道消息使用者的存在，反之亦然
 * RabbitMQ不像ActiveMQ,并没有完整实现JMS
 * Created by zeyuphoenix on 2016/12/30.
 */
@Configuration
// @EnableRabbit和@Configuration一起使用，可以加在类或者方法上，这个注解开启了容器对注册的bean的@RabbitListener检查
@EnableRabbit
public class RabbitMqConfiguration {

    // ================================================================
    // Constants
    // ================================================================

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfiguration.class);

    // 一般我们需要定义queue、exchange和routing key,构造成一个完整消息队列的通道
    // 心跳常量
    public static final String QUEUE_HEART = "heart-queue";
    public static final String ROUTING_KEY_HEART = "heart-key";

    // 公共信息通道常量
    public static final String QUEUE_COMMON = "common-queue";
    public static final String EXCHANGE_COMMON = "common-exchange";
    public static final String ROUTING_KEY_COMMON = "common-key";
    // 发布定义信道
    public static final String QUEUE_FANOUT = "fanout-queue";
    public static final String QUEUE_FANOUT_SEC = "fanout-sec-queue";
    public static final String EXCHANGE_FANOUT = "fanout-exchange";
    // RPC请求、应答常量
    public static final String QUEUE_RPC_REQUEST = "rpc-request-queue";
    public static final String QUEUE_RPC_RESPONSE = "rpc-response-queue";

    // doc convert 通道常量
    public static final String QUEUE_DOC_CONVERT = "doc-convert-queue";
    public static final String ROUTING_DOC_CONVERT = "doc-convert-key";

    // ================================================================
    // Fields
    // ================================================================

    // 建立一个连接容器，类型数据库的连接池,这里直接使用配置文件自动构建
    @Autowired
    private ConnectionFactory connectionFactory;

    // ================================================================
    // Constructors
    // ================================================================

    // ================================================================
    // Methods from/for super Interfaces or SuperClass
    // ================================================================

    // ================================================================
    // Public or Protected Methods
    // ================================================================

    /**
     * 创建AMQP管理器,操作AMQP的基本设置
     */
    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    // 增加失败重试机制，发送失败之后，会尝试重发三次，重发间隔(ms)为
    // 第一次 initialInterval
    // 此后: initialInterval * multiplier > maxInterval ? maxInterval : initialInterval * multiplier
    // 配合集群使用的时候, 当mq集群中一个down掉之后,重试机制尝试其他可用的mq
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(500);
        exponentialBackOffPolicy.setMaxInterval(5000);
        exponentialBackOffPolicy.setMultiplier(10.0);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        return retryTemplate;
    }

    // RabbitMQ的使用入口,未指定任务queue等信道信息,默认模板
    @Bean(name = "rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, RetryTemplate retryTemplate) {
        logger.info("init normal template, if not set info, will use it.");
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyTimeout(600000);
        rabbitTemplate.setRetryTemplate(retryTemplate);
        return rabbitTemplate;
    }

    // 用于发布、订阅的模板入口
    @Bean(name = "faoutRabbitTemplate")
    public RabbitTemplate faoutRabbitTemplate(ConnectionFactory connectionFactory, RetryTemplate retryTemplate) {
        logger.info("init fanout template, use for publish and subscribe.");
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyTimeout(600000);
        rabbitTemplate.setExchange(EXCHANGE_FANOUT);
        rabbitTemplate.setRetryTemplate(retryTemplate);
        return rabbitTemplate;
    }

    // RPC同步请求和应答
    // 也可以使用AsyncRabbitTemplate
    @Bean(name = "rpcRabbitTemplate")
    public RabbitTemplate rpcRabbitTemplate(ConnectionFactory connectionFactory, RetryTemplate retryTemplate) {
        logger.info("init rpc template");
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyTimeout(600000);
        rabbitTemplate.setReplyAddress(QUEUE_RPC_RESPONSE);
        rabbitTemplate.setRetryTemplate(retryTemplate);
        return rabbitTemplate;
    }

    /**
     * RPC信息的客户端处理
     */
    @Bean
    public SimpleMessageListenerContainer clientListenerContainer(ConnectionFactory connectionFactory,
                                                                  RetryTemplate retryTemplate) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(responseQueue());
        container.setMessageListener(rpcRabbitTemplate(connectionFactory, retryTemplate));

        return container;
    }

    /**
     * RPC信息的服务端处理
     */
    @Bean
    public SimpleMessageListenerContainer serverListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(requestQueue());
        container.setMessageListener(new MessageListenerAdapter(new RPCMessageHandler()));
        return container;
    }


    // 只声明queue,会使用默认exchange
    // 可以配置持久化(durable)、专属性(exclusive)、autoDelete
    // durable:是否持久化
    // exclusive: 仅创建者可以使用的私有队列，断开后自动删除
    // auto_delete: 当所有消费客户端连接断开后，是否自动删除队列

    /**
     * 心跳发送通道
     */
    @Bean
    public Queue heartQueue() {
        return new Queue(QUEUE_HEART, false);
    }

    /**
     * 公共信息发送通道
     */
    @Bean
    public Queue commonQueue() {
        return new Queue(QUEUE_COMMON, true);
    }

    /**
     * doc 转换通道
     */
    @Bean
    public Queue convertQueue() {
        return new Queue(QUEUE_DOC_CONVERT, true);
    }

    /**
     * 发布订阅通道
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue(QUEUE_FANOUT, true);
    }

    /**
     * 另外一个发布订阅通道
     */
    @Bean
    public Queue fanoutSecQueue() {
        return new Queue(QUEUE_FANOUT_SEC, true);
    }

    /**
     * RPC的请求通道
     */
    @Bean
    public Queue requestQueue() {
        return new Queue(QUEUE_RPC_REQUEST, true);
    }

    /**
     * RPC的应答通道
     */
    @Bean
    public Queue responseQueue() {
        return new Queue(QUEUE_RPC_RESPONSE, true);
    }

    // 定义一个直连交换机,Exchange又称交换器，它接受消息和路由信息，然后将消息发送给消息队列
    // 可以配置持久化(durable)、专属性(exclusive)、autoDelete
    // 一对一队列, exchange queue binging key 绑定，作为点对点模式使用
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_COMMON);
    }

    // fanout-exchange，作为发布-订阅模式使用
    // 转发消息到所有绑定队列
    // 由于RabbitMQ的发布订阅模型是根据多个queue，多个消费者订阅实现的。此处声明的exchange不必预先绑定queue，
    // 在消费者声明queue并绑定到该exchange即可
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_FANOUT);
    }

    // 按规则转发消息（最灵活）
    // topic-exchange，作为主题模式使用, 匹配routingkey的模式
    // TopicExchange

    // 要求队列和直连交换机绑定，指定ROUTING_KEY
    @Bean
    public Binding bindingCommon(Queue commonQueue, DirectExchange exchange) {
        return BindingBuilder.bind(commonQueue).to(exchange).with(ROUTING_KEY_COMMON);
    }

    @Bean
    public Binding bindingHeat(Queue heartQueue, DirectExchange exchange) {
        return BindingBuilder.bind(heartQueue).to(exchange).with(ROUTING_KEY_HEART);
    }

    @Bean
    public Binding bindingConvert(Queue convertQueue, DirectExchange exchange) {
        return BindingBuilder.bind(convertQueue).to(exchange).with(ROUTING_DOC_CONVERT);
    }

    // 将订阅的queue绑定到fanout-exchange上
    @Bean
    public Binding bindingFanout(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }

    // 将订阅的queue绑定到fanout-exchange上
    @Bean
    public Binding bindingFanoutSec(Queue fanoutSecQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutSecQueue).to(fanoutExchange);
    }

    // 声明一个监听容器.有注解后最低优先级
    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                    LowestMessageListener listenerAdapter) {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 监听common和heart通道
        container.setQueueNames(QUEUE_COMMON, QUEUE_HEART);
        //设置确认模式手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setExposeListenerChannel(true);
        // container.setMaxConcurrentConsumers(10);
        // container.setConcurrentConsumers(10);
        container.setMessageListener(listenerAdapter);

        return container;
    }

    // ================================================================
    // Getter & Setter
    // ================================================================

    // ================================================================
    // Private Methods
    // ================================================================

    // ================================================================
    // Inner or Anonymous Class
    // ================================================================

    // ================================================================
    // Test Methods
    // ================================================================

}
